package spark.structed_streaming

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Duration, Seconds}
import spark.structed_streaming.TimeTest.getTimestamp

/**
  *
  * @author eureka.wh
  * @since 2019-06-05 16:04
  */
object SparkStructed01 {

  case class Person(timestamp: Timestamp, word: String)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder
      .master("local[2]")
      .appName("SparkStructed01")
      .getOrCreate()

    import spark.implicits._

    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    val words = lines.as[String].flatMap(_.split(" "))
      .map((getCurrentTimestamp,_))
      .toDF("timestamp","word")

    val wordCounts = words.groupBy(
      window($"timestamp","10 seconds","5 seconds")
      ,$"word")
      .count()

    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate",false)
      .start()

    query.awaitTermination()

  }

  // 获取当前系统时间戳
  def getCurrentTimestamp :java.sql.Timestamp = {

    val d =  new Date()
    val t = new Timestamp(d.getTime());
    return t;
  }


  // 字符 --> Timestamp
  def getTimestamp(x:String) :java.sql.Timestamp = {

    val format = new SimpleDateFormat("yyyyMMddHHmmss")
    var ts = new Timestamp(System.currentTimeMillis())

    try{
      if(x == ""){
        return null
      }else{
        val d = format.parse(x);
        val t = new Timestamp(d.getTime());
        return t;
      }
    }catch{
      case e: Exception => println("parse timestamp wrong")
    }
    return null
  }

}
